package com.atguigu.utils;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

// 注意接口的泛型
public class ClickSource implements SourceFunction<ClickEvent> {
    private boolean running = true;
    private Random random = new Random();
    private String[] userArr = {"Mary", "Bob", "Alice", "John", "Liz"};
    private String[] urlArr = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
    @Override
    public void run(SourceContext<ClickEvent> ctx) throws Exception {
        while (running) {
            ctx.collect(
                    new ClickEvent(
                            userArr[random.nextInt(userArr.length)],
                            urlArr[random.nextInt(urlArr.length)],
                            Calendar.getInstance().getTimeInMillis() // 毫秒级时间戳
                    )
            );
            Thread.sleep(1000L);
        }
    }

    // 命令行：flink cancel JobID
    @Override
    public void cancel() {
        running = false;
    }
}
